﻿using IOP.Models;
using IOP.Models.Message.MQTT;
using IOP.Models.Message.MQTT.Package;
using IOP.Models.Tree.BinaryTree;
using IOP.Pulsar.MQTT.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace IOP.Pulsar.MQTT
{
    /// <summary>
    /// 会话
    /// </summary>
    public class Session : IDisposable
    {
        /// <summary>
        /// 客户端Id
        /// </summary>
        public string ClientId { get; private set; }
        /// <summary>
        /// 转发
        /// </summary>
        public event Action<PublishPackage> OnTransmit;
        /// <summary>
        /// 重发次数
        /// </summary>
        public int RepeatCount { get; set; } = 3;
        /// <summary>
        /// 重发间隔
        /// </summary>
        public double RepeatInterval { get; set; } = 10000;
        /// <summary>
        /// 订阅的主题
        /// </summary>
        internal ConcurrentRBTree<string, SubscribedTopic> Topics { get; set; } = new ConcurrentRBTree<string, SubscribedTopic>();
        /// <summary>
        /// 遗嘱缓存
        /// </summary>
        public ConcurrentBag<PublishPackage> WillPackages { get; } = new ConcurrentBag<PublishPackage>();
        /// <summary>
        /// 主题中心
        /// </summary>
        public readonly ITopicCenter _TopicCenter;
        /// <summary>
        /// 是否清理缓存
        /// </summary>
        public readonly bool ClearSession;
        /// <summary>
        /// 是否销毁了资源
        /// </summary>
        private volatile bool IsDispose = false;
        /// <summary>
        /// 互斥锁
        /// </summary>
        private readonly object _SyncRoot = new object();
        /// <summary>
        /// 自旋锁
        /// </summary>
        private SpinLock _SpinLock = new SpinLock();
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="clientId">客户端ID</param>
        /// <param name="center">主题中心</param>
        /// <param name="clearSession">是否清除缓存</param>
        public Session(string clientId, ITopicCenter center, bool clearSession = false)
        {
            ClientId = clientId;
            _TopicCenter = center;
            ClearSession = clearSession;
            Topics.Put("/", null);
        }

        /// <summary>
        /// 取消订阅函数
        /// </summary>
        /// <param name="package"></param>
        public void Unsubscribe(UnsubscribePackage package)
        {
            lock (_SyncRoot)
            {
                foreach (var item in package.TopicFilters)
                {
                    foreach (var topic in Topics)
                    {
                        if (topic.Key.MatchWildcard(item))
                        {
                            topic.Value.Topic.OnPublish -= Topic_OnPublish;
                            Topics.Delete(topic.Key);
                        }
                    }
                }

            }
        }

        /// <summary>
        /// 会话订阅函数
        /// </summary>
        /// <param name="package">订阅包</param>
        public List<SubackCode> Subscribe(SubscribePackage package)
        {
            lock (_SyncRoot)
            {
                var result = new List<SubackCode>();
                foreach (var item in package.TopicFilters)
                {
                    var items = _TopicCenter.GetTopics(item.FilterName);
                    if (items.Any())
                    {
                        foreach (var topic in items)
                        {
                            if (!Topics.Contains(topic.TopicName, out SubscribedTopic local)) CreateSubscribedTopic(topic, item.RequestedQoS);
                        }
                        result.Add((SubackCode)item.RequestedQoS);
                    }
                    else result.Add(SubackCode.FailedQoS);
                }
                return result;
            }
        }

        /// <summary>
        /// 会话订阅函数
        /// </summary>
        /// <param name="topics"></param>
        internal void Subscribe(List<CacheSubscribedTopic> topics)
        {
            foreach (var item in topics)
            {
                var topic = _TopicCenter.GetOrCreateTopic(item.Topic);
                CreateSubscribedTopic(topic, item.SubscribedQoS);
            }
        }

        /// <summary>
        /// 销毁资源
        /// </summary>
        public void Dispose()
        {
            lock (_SyncRoot)
            {
                foreach (var will in WillPackages)
                {
                    var topic = _TopicCenter.GetOrCreateTopic(will.TopicName);
                    topic.Publish(will);
                }
                OnTransmit = null;
                IsDispose = true;
                foreach (var item in Topics)
                {
                    if (item.Value != null)
                    {
                        item.Value.Topic.OnPublish -= Topic_OnPublish;
                    }
                    Topics.Delete(item.Key);
                }
                WillPackages.Clear();
            }
        }

        /// <summary>
        /// 创建被订阅的主题
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="qoSType"></param>
        private void CreateSubscribedTopic(Topic topic, QoSType qoSType)
        {
            SubscribedTopic subscribed = new SubscribedTopic();
            subscribed.SubscribedQoS = qoSType;
            subscribed.Topic = topic;
            topic.OnPublish += Topic_OnPublish;
            Topics.Put(topic.TopicName, subscribed);
            var retain = topic.Datas.FirstOrDefault();
            if(retain != null)
            {
                var packetId = _TopicCenter.CreatePacketIdentifier();
                var newPublish = new PublishPackage(retain.PublishPackage.TopicName, packetId, retain.PublishPackage.Body, subscribed.SubscribedQoS, false, true);
                if (subscribed.SubscribedQoS != QoSType.QoS0)
                {
                    RepeatTask repeatTask = new RepeatTask(RepeatCount, RepeatInterval);
                    CachePublishPackage cachePublish = new CachePublishPackage(newPublish, repeatTask);
                    cachePublish.TaskFinish += CachePublish_TaskFinish;
                    cachePublish.RunTask(() =>
                    {
                        var repeatPublish = new PublishPackage(newPublish.TopicName, packetId, newPublish.Body, newPublish.QoS, true);
                        OnTransmit?.Invoke(repeatPublish);
                    });
                    _TopicCenter.CachePackages.TryAdd(packetId, cachePublish);
                }
                OnTransmit?.Invoke(newPublish);
            }
        }

        /// <summary>
        /// 当有消息推送时
        /// </summary>
        /// <param name="package"></param>
        private void Topic_OnPublish(PublishPackage package)
        {
            bool voucher = false;
            try
            {
                _SpinLock.Enter(ref voucher);
                if (!IsDispose)
                {
                    var topic = Topics[package.TopicName];
                    if (topic != null)
                    {
                        var packetId = _TopicCenter.CreatePacketIdentifier();
                        var newPublish = new PublishPackage(package.TopicName, packetId, package.Body, topic.SubscribedQoS);
                        if (topic.SubscribedQoS != QoSType.QoS0)
                        {
                            RepeatTask repeatTask = new RepeatTask(RepeatCount, RepeatInterval);
                            CachePublishPackage cachePublish = new CachePublishPackage(newPublish, repeatTask);
                            cachePublish.TaskFinish += CachePublish_TaskFinish;
                            cachePublish.RunTask(() =>
                            {
                                var repeatPublish = new PublishPackage(newPublish.TopicName, packetId, newPublish.Body, newPublish.QoS, true);
                                OnTransmit?.Invoke(repeatPublish);
                            });
                            _TopicCenter.CachePackages.TryAdd(packetId, cachePublish);
                        }
                        OnTransmit?.Invoke(newPublish);
                    }
                }
            }
            finally
            {
                if (voucher) _SpinLock.Exit();
            }
        }

        /// <summary>
        /// 缓存任务结束时
        /// </summary>
        /// <param name="packetId"></param>
        private void CachePublish_TaskFinish(ushort packetId)
        {
            _TopicCenter.CachePackages.TryRemove(packetId, out CachePublishPackage task);
            task?.Dispose();
        }
    }
}
